package ink.tsg.pool.runner;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.UUID;

/**
 * 数据计算解析后发送给EMQ
 * @title: DataComputeSendService
 * @Author zrj
 * @Date: 2021/10/27 16:30
 * @Version 1.0
 */
@Slf4j
@Component
public class DataComputeSendService implements ApplicationRunner {

	private Integer isSend = 1;

	private String clientName;

	private String url = "192.168.101.128";

	private Integer port = 1883;

	private String username = "admin";

	private String password ="public";

	private String topicName = "scada_workjob";

	private static MqttClient sendClient;

	@Override
	public void run(ApplicationArguments args) throws UnknownHostException {


		new Thread(() ->{
			while (true) {
				MqttClientOptions mqttClientOptions = new MqttClientOptions();
				mqttClientOptions.setClientId("测试");
				mqttClientOptions.setMaxMessageSize(1000000000);

				mqttClientOptions.setPassword(password);

				mqttClientOptions.setUsername(username);

				sendClient = MqttClient.create(Vertx.vertx(),mqttClientOptions);

				sendClient.connect(port, url, result -> {
					if(result.succeeded()) {
						log.info("scada数据发送连接EMQ服务器成功");
					}
				});
			}
		}).start();
		publishEmq();
	}

	public void publishEmq(){
		if(sendClient != null && sendClient.isConnected()) {
			sendClient.publish(topicName, Buffer.buffer(JSON.toJSONString("2022-05-13 14:17:59.122  INFO 21604 --- [ntloop-thread-0] i.t.pool.runner.DataComputeSendService   : scada数据发送连接EMQ服务器成功\n" +
					"2022-05-13 14:18:05.819  INFO 21604 --- [ntloop-thread-0] io.vertx.mqtt.impl.MqttClientImpl        : Connection with 192.168.101.128:1883 established successfully\n" +
					"2022-05-13 14:18:07.880  INFO 21604 --- [ntloop-thread-0] i.t.pool.runner.DataComputeSendService   : scada数据发送连接EMQ服务器成功\n" +
					"2022-05-13 14:18:13.728  INFO 21604 --- [ntloop-thread-0] io.vertx.mqtt.impl.MqttClientImpl        : Connection with 192.168.101.128:1883 established successfully\n" +
					"2022-05-13 14:18:15.656  INFO 21604 --- [ntloop-thread-0] i.t.pool.runner.DataComputeSendService   : scada数据发送连接EMQ服务器成功\n" +
					"2022-05-13 14:18:21.977  INFO 21604 --- [ntloop-thread-0] io.vertx.mqtt.impl.MqttClientImpl        : Connection with 192.168.101.128:1883 established successfully\n" +
					"2022-05-13 14:18:23.752  INFO 21604 --- [ntloop-thread-0] i.t.pool.runner.DataComputeSendService   : scada数据发送连接EMQ服务器成功\n" +
					"Disconnected from the target VM, address: '127.0.0.1:54297', transport: 'socket'\n" +
					"2022-05-13 14:18:29.274  INFO 21604 --- [ntloop-thread-0] i.t.pool.runner.DataComputeSendService   : scada数据发送连接EMQ服务器成功\n" +
					"2022-05-13 14:18:29.587  INFO 21604 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Shutting down ExecutorService 'applicationTaskExecutor'\n", SerializerFeature.DisableCircularReferenceDetect)), MqttQoS.AT_MOST_ONCE, false, false);
		}
	}

	public void publish(String jsonStr, String topicName){
		if(sendClient != null && sendClient.isConnected()) {
			sendClient.publish(topicName, Buffer.buffer(jsonStr), MqttQoS.AT_MOST_ONCE, false, false);
		}
	}

	public void publishEmq(String str, String topicName){
		if(sendClient != null && sendClient.isConnected()){
			sendClient.publish(topicName, Buffer.buffer(str), MqttQoS.AT_MOST_ONCE, false, false);
		}
	}
}
